package hotnet.processor;

import hotnet.buffer.IoBuffer;
import hotnet.common.NamedRunnable;
import hotnet.common.SessionState;
import hotnet.config.IoSessionConfig;
import hotnet.exception.ExceptionMonitor;
import hotnet.exception.WriteToCloseSessionException;
import hotnet.filter.IoFilterChain;
import hotnet.future.ConstantFutureMessage;
import hotnet.future.DefaultIoFuture;
import hotnet.future.WriteFuture;
import hotnet.listener.IoServiceListenerManager;
import hotnet.session.AbstractIoSession;
import hotnet.session.IoSession;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractPollingIoProcessor implements IoProcessor {
	private final static Logger logger = LoggerFactory.getLogger(IoProcessor.class);
	private final static ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
	
	private static final long SELECT_TIMEOUT = 1000L;
	private static final int MAX_BUG_RUN = 10;
	
	private final Executor executor;
	private final String threadName;
	
	private final Queue<IoSession> newSessions = new ConcurrentLinkedQueue<IoSession>();
	private final Queue<IoSession> removingSessions = new ConcurrentLinkedQueue<IoSession>();
	private final Queue<IoSession> flushingSessions = new ConcurrentLinkedQueue<IoSession>();
	
	private final AtomicReference<ProcessorRunnable> processorRef = new AtomicReference<ProcessorRunnable>();
	
	private final Object disposalLock = new Object();
    private volatile boolean disposing;
    private volatile boolean disposed;
    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
    
    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
	
	public AbstractPollingIoProcessor(Executor executor) {
		this.executor = executor;
		
		threadName = nextThreadName();
	}
	
	private String nextThreadName() {
		Class<?> cls = getClass();
		int newThreadId;
		String simpleThread = cls.getSimpleName();
		AtomicInteger i = threadIds.putIfAbsent(cls, new AtomicInteger(1));
		
		if (i == null) {
			newThreadId = 1;
		} else {
			newThreadId = i.incrementAndGet();
		}
		
		StringBuilder builder = new StringBuilder();
		builder.append(simpleThread);
		builder.append("-");
		builder.append(newThreadId);
		
		return builder.toString();
	}
	
    public final boolean isDisposing() {
        return disposing;
    }
    
    public final boolean isDisposed() {
        return disposed;
    }
    
    public final void dispose() {
        if (disposed || disposing) {
            return;
        }

        synchronized (disposalLock) {
            disposing = true;
            startupProcessor();
        }

        disposalFuture.awaitUninterruptibly();
        disposed = true;
    }
    
    private void startupProcessor() {
    	ProcessorRunnable processor = processorRef.get();
    	
    	if (processor == null) {
    		processor = new ProcessorRunnable();
    		
    		if (processorRef.compareAndSet(null, processor)) {
    			executor.execute(new NamedRunnable(processor, threadName));
    		}
    	}
    	
    	wakeup();
    }
    
    public void addSession(IoSession session) {
    	if (disposed || disposing) {
            throw new IllegalStateException("Already disposed.");
        }
    	
    	newSessions.add(session);
    	
    }
    
    private boolean addNow(IoSession session) {
    	boolean registered = false;
    	
    	try {
    		logger.info(session.toString()+ "added session to processor!");
    		init(session);
    		
    		registered = true;
    		
    		IoServiceListenerManager manager  = session.getService().getListenerManager();
    		manager.fireSessionCreate(session);
    	} catch (Exception e) {
			ExceptionMonitor.getInstance().exceptionCaught(e);
			registered = false;
			
			try {
				destroy(session);
			} catch (Exception e1) {
				ExceptionMonitor.getInstance().exceptionCaught(e);
			}
		}
    	
    	return registered;
    }
    
    private int handleNewSessions() {
    	int addSessions = 0;
    	
    	while (true) {
    		IoSession session = newSessions.poll();
    		if (session == null) {
    			break;
    		}
    		
    		if (addNow(session)) {
    			addSessions++;
    		}
    	}
    	
    	return addSessions;
    }
    
    private boolean removeNow(IoSession session) {
    	clearWriteRequestQueue(session);
    	
    	try {
    		destroy(session);
    		
    		return true;
    	} catch (Exception e) {
    		session.getFilterChain().fireExceptionCaught(e);
		} finally {
            clearWriteRequestQueue(session);
            session.getService().getListenerManager().fireSessionDestroyed(session);
        }
    	
        return false;
    }
    
    private void clearWriteRequestQueue(IoSession session) {
		logger.info("session:" + session.getSessionId() + " clearWriteRequestQueue!");

    	AbstractIoSession s = (AbstractIoSession)session;
    	Queue<WriteFuture> queue = s.getWriteFutureQueue();
    	WriteFuture firstFuture = null;
    	List<WriteFuture> failedRequests = new ArrayList<WriteFuture>();
    	
    	firstFuture = queue.poll();
    	if (firstFuture != null) {
    		Object msg = firstFuture.getMessage();
    		IoFilterChain chain = session.getFilterChain();
    		
    		if (msg instanceof IoBuffer) {
    			IoBuffer buffer = (IoBuffer)msg;
    			
    			if (!buffer.hasRemaining()) {
    				logger.info("session:" + session.getSessionId() + " fireMessageSent send first remain msg!");

    				chain.fireMessageSent(firstFuture);
    			}
    		}
    		else {
    			failedRequests.add(firstFuture);
    		}
    	}
    	
    	WriteFuture future;
    	while ((future = queue.poll()) != null) {
    		failedRequests.add(future);
    	}
    	
    	if (!failedRequests.isEmpty()) {
    		WriteToCloseSessionException cause = new WriteToCloseSessionException(failedRequests);
    		
    		for (WriteFuture r : failedRequests) {
    			r.setException(cause);
    		}
    		
    		session.getFilterChain().fireExceptionCaught(cause);
    	}
    }
    
    private int removeSessions() {
    	int removedSessions = 0;
    	
    	while (true) {
    		IoSession session = removingSessions.poll();
    		
    		if (session == null) {
    			break;
    		}
    		
    		logger.info("session:" + session.getSessionId() + " poll from removing queue!");
    		SessionState state = getState(session);
    		
    		switch (state) {
			case OPENED:
				if (removeNow(session)) {
					removedSessions++;
				}
				break;
			case CLOSING:
				break;
			case OPENING:
				newSessions.remove(session);
				
				if (removeNow(session)) {
                    removedSessions++;
                }
				break;
			default:
				throw new IllegalStateException(String.valueOf(state));
			}
    	}
    	
    	return removedSessions;
    }
    
    private void processSelected() throws Exception {
    	for (Iterator<IoSession> i = selectedSessions(); i.hasNext();) {
    		process(i.next());
    		
    		i.remove();
    	}
    }
    
    private void read(IoSession session) {
    	IoSessionConfig config = session.getSessionConfig();
    	int bufferSize = config.getReadBufferSize();
    	int ret, readBytes = 0;
    	IoBuffer buffer = IoBuffer.allocate(bufferSize);
    
    	try {
	    	while ((ret = read(session, buffer)) > 0) {
	    		readBytes += ret;
	    		
	    		if (!buffer.hasRemaining()) {
	    			break;
	    		}
	    	}
	    	
	    	buffer.flip();
	    	
	    	if (readBytes > 0) {
	    		session.getFilterChain().fireMessageReceived(buffer);
	    	}

	    	if (ret < 0 || readBytes == 0) {
	    		IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireInputClosed();
	    	}
    	} catch (Exception e) {
    		scheduleRemove(session);
    		IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
		}
    }
    
    private void process(IoSession session) {
    	if (isReadable(session)) {
    		read(session);
    	} else if (isWritable(session)) {
    		AbstractIoSession s = (AbstractIoSession)session;
    		if (s.setScheduledForFlush(true)) {
                flushingSessions.add(session);
            }
    	}
    }
    
    private void scheduleRemove(IoSession session) {
    	logger.info("session:" + session.getSessionId() + " add to removing queue!");
    	removingSessions.add(session);
    }
    
    private void scheduleFlush(AbstractIoSession session) {
    	if (session.setScheduledForFlush(true)){
    		flushingSessions.add(session);
    	}
    }
    
    private void flush() {
    	if (flushingSessions.isEmpty()) {
    		return;
    	}
    	
    	IoSession session;
    	
    	while ((session = flushingSessions.poll()) != null) {
    		AbstractIoSession s = (AbstractIoSession)session;
    		
    		s.unscheduledForFlush();
    		
    		SessionState state = getState(s);
    		switch (state) {
			case OPENED:
				try {
                    boolean flushedAll = flushNow(s);

                    if (!flushedAll && !s.getWriteFutureQueue().isEmpty()) {
                        scheduleFlush(s);
                    }
                } catch (Exception e) {
                    scheduleRemove(session);
                    IoFilterChain filterChain = session.getFilterChain();
                    filterChain.fireExceptionCaught(e);
                }
				
				break;
			case CLOSING:
				break;
				
			case OPENING:
				scheduleFlush(s);
				break;

			default:
				break;
			}
    	}
    }
    
    private boolean flushNow(AbstractIoSession session) {
    	if (!session.isConnected()) {
    		scheduleRemove(session);
    		return false;
    	}
    	
    	final Queue<WriteFuture> writeRequestQueue = session.getWriteFutureQueue();

    	int localWrittenBytes = 0;
    	WriteFuture req = null;
    	Object msg = null;
    	
    	try {
    		setInterestedInWrite(session, false);
    		while (!writeRequestQueue.isEmpty()) {
	    		req = writeRequestQueue.poll();
	    		
	    		if (req == null) {
	    			break;
	    		}
	    		
	    		msg = req.getMessage();
	    		
	    		if (msg instanceof IoBuffer) {
	    			IoBuffer buf = ((IoBuffer) msg);
	    			
	    			localWrittenBytes += writeBuffer(session, req);
	    			if (buf.hasRemaining())
	    				// set write opts
	    				setInterestedInWrite(session, true);
	    			
	    			buf.free();
	    			return false;
	    		} else if (msg == ConstantFutureMessage.closeObject) {
	    			
	    			IoFilterChain chain = session.getFilterChain();
	    			chain.fireFilterClose();
	    			
	    			// discard write request after close request 
	    	    	List<WriteFuture> failedRequests = new ArrayList<WriteFuture>();
	    	    	
	    			while (!writeRequestQueue.isEmpty()) {
	    				req = writeRequestQueue.poll();
	    				
	    				if (req == null) {
	    					break;
	    				}
	    				
	    				failedRequests.add(req);
	    			}

	    			if (!failedRequests.isEmpty()) {
	    				WriteToCloseSessionException exception = 
	    						new WriteToCloseSessionException(failedRequests, "write after close request!");
	    			
	    				for (WriteFuture f : failedRequests) {
	    					f.setException(exception);
	    				}
	    				
	    				IoFilterChain filterChain = session.getFilterChain();
	    	            filterChain.fireExceptionCaught(exception);
	    			}
	    			
	    			break;
	    		}
	    		else {
	    			throw new IllegalArgumentException("unknown msg: " + msg.getClass().getName());
	    		}
    		}
    	} catch(Exception e) {
    		 if (req != null) {
                 req.setException(e);
             }
    		 
    		 IoFilterChain chain = session.getFilterChain();
    		 chain.fireExceptionCaught(e);
    		 
    		 return false;
    	}
    	
    	return true;
    }
    
    private int writeBuffer(AbstractIoSession session, WriteFuture req) throws Exception {
    	IoBuffer buffer = (IoBuffer) req.getMessage();
    	int localWrittenBytes = 0;
    	
    	if (buffer.hasRemaining()) {
    		try {
    			localWrittenBytes = write(session, buffer, buffer.remaining());
    		} catch(IOException ioe) {
    			buffer.free();
    			session.close();
    			destroy(session);
    			
    			return 0;
    		}
    	}
    	
    	if (!buffer.hasRemaining()) {
    		int pos = buffer.position();
    		
    		buffer.reset();
    		
    		session.getFilterChain().fireMessageSent(req);
    		
    		buffer.position(pos);
    	}
    	
    	return localWrittenBytes;
    }
    
    public void write(IoSession session, WriteFuture future) {
    	AbstractIoSession s = (AbstractIoSession)session;
    	Queue<WriteFuture> queue = s.getWriteFutureQueue();
    	
    	queue.offer(future);
    	
    	this.flush(s);
    }
    
    public final void flush(IoSession session) {
        // add the session to the queue if it's not already
        // in the queue, then wake up the select()
    	AbstractIoSession s = (AbstractIoSession)session;
        if (s.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            wakeup();
        }
    }
    
    public final void add(IoSession session) {
        if (disposed || disposing) {
            throw new IllegalStateException("Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor();
    }
    
    public final void remove(IoSession session) {
        scheduleRemove(session);
        startupProcessor();
    }
    
    protected abstract void init(IoSession session) throws Exception;
    protected abstract void wakeup(); 
    protected abstract void registerNewSelector() throws IOException;
    protected abstract int select(long timeout) throws Exception;
    protected abstract Iterator<IoSession> allSessions();
    protected abstract Iterator<IoSession> selectedSessions();
    protected abstract boolean isWritable(IoSession session);
    protected abstract boolean isReadable(IoSession session);
    protected abstract void setInterestedInWrite(IoSession session, boolean isInterested) throws Exception;
    protected abstract void setInterestedInRead(IoSession session, boolean isInterested) throws Exception;
    protected abstract boolean isInterestedInRead(IoSession session);
    protected abstract boolean isInterestedInWrite(IoSession session);
    protected abstract int read(IoSession session, IoBuffer buf) throws Exception;
    protected abstract int write(IoSession session, IoBuffer buf, int length) throws Exception;
    protected abstract boolean isBrokenConnection() throws IOException;
    protected abstract void doDispose() throws Exception;
    protected abstract boolean isSelectorEmpty();
    protected abstract void destroy(IoSession session) throws Exception;
    protected abstract SessionState getState(IoSession session);
    
	private class ProcessorRunnable implements Runnable {
		public void run() {
			int nSession = 0;
			int nBugRun = 0;
			Thread.currentThread().setName(threadName);
			for (;;) {
				try {
					long t0 = System.currentTimeMillis();
					int selected = select(SELECT_TIMEOUT);
					long t1 = System.currentTimeMillis();
                    long delta = (t1 - t0);
					
					if (selected == 0 && !wakeupCalled.get() && delta < 100) {
						if (isBrokenConnection()) {
							nBugRun = 0;
							wakeupCalled.getAndSet(false);
							
							continue;
						} else {
							if (nBugRun >= MAX_BUG_RUN) {
								registerNewSelector();
								nBugRun = 0;
							}
						}
					}
										
					nSession += handleNewSessions();

					if (selected > 0) {
						processSelected();
					}
					
					flush();
					
					nSession -= removeSessions();
					
					if (nSession == 0) {
						processorRef.set(null);
						
						if (newSessions.isEmpty() && isSelectorEmpty()) {
							break;
						}
						
						/* startup processor win */
						if (!processorRef.compareAndSet(null, this)) {
							break;
						}
					}
					
				} catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    // But first, dump a stack trace
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
			}
			
			try {
				synchronized(disposalLock) {
					if (disposing) {
						doDispose();
					}
				}
			} catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            } finally {
                disposalFuture.setValue(true);
            }
		}
	}
}
